package net.corda.node.flows

import co.paralleluniverse.fibers.Suspendable
import net.corda.core.CordaException
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowSession
import net.corda.core.flows.InitiatedBy
import net.corda.core.flows.InitiatingFlow
import net.corda.core.flows.StartableByRPC
import net.corda.core.flows.StateMachineRunId
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.internal.attributes
import net.corda.core.internal.copyToDirectory
import net.corda.core.internal.deleteRecursively
import net.corda.core.internal.hash
import net.corda.core.messaging.startFlow
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.unwrap
import net.corda.node.internal.CheckpointIncompatibleException
import net.corda.node.services.statemachine.Checkpoint
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.BOB_NAME
import net.corda.testing.core.singleIdentity
import net.corda.testing.driver.DriverDSL
import net.corda.testing.driver.DriverParameters
import net.corda.testing.driver.NodeHandle
import net.corda.testing.driver.NodeParameters
import net.corda.testing.driver.driver
import net.corda.testing.node.internal.assertUncompletedCheckpoints
import net.corda.testing.node.internal.enclosedCordapp
import org.assertj.core.api.Assertions.assertThat
import org.junit.Test
import java.nio.file.Path
import kotlin.io.path.createDirectories
import kotlin.io.path.div
import kotlin.io.path.listDirectoryEntries
import kotlin.io.path.readText
import kotlin.test.assertFailsWith

// TraderDemoTest already has a test which checks the node can resume a flow from a checkpoint
class FlowCheckpointVersionNodeStartupCheckTest {
    companion object {
        val defaultCordapp = enclosedCordapp()
    }

    @Test(timeout=300_000)
    fun `restart node with mismatch between suspended flow and installed CorDapps`() {
        driver(DriverParameters(
                startNodesInProcess = false,
                inMemoryDB = false, // Ensure database is persisted between node restarts so we can keep suspended flows
                cordappsForAllNodes = emptyList(),
                notarySpecs = emptyList(),
                allowHibernateToManageAppSchema = false
        )) {
            val (bob, _) = createSuspendedFlowInBob()
            bob.stop()
            restartBobWithMismatchedCorDapp()
        }
    }

    @Test(timeout=300_000)
    fun `restart node with mismatch between suspended paused flow and installed CorDapps`() {
        driver(DriverParameters(
                startNodesInProcess = false,
                inMemoryDB = false, // Ensure database is persisted between node restarts so we can keep suspended flows
                cordappsForAllNodes = emptyList(),
                notarySpecs = emptyList(),
                allowHibernateToManageAppSchema = false
        )) {
            val (bob, flowId) = createSuspendedFlowInBob()
            val flow = bob.rpc.startFlow(::UpdateStatusToPaused, flowId)
            flow.returnValue.getOrThrow()
            bob.stop()
            restartBobWithMismatchedCorDapp()
        }
    }

    private fun DriverDSL.createSuspendedFlowInBob(): Pair<NodeHandle, StateMachineRunId> {
        val (alice, bob) = listOf(
                startNode(providedName = ALICE_NAME),
                startNode(NodeParameters(providedName = BOB_NAME, additionalCordapps = listOf(defaultCordapp)))
        ).map { it.getOrThrow() }

        alice.stop() // Stop Alice so that Bob never receives the message

        val flowId = bob.rpc.startFlow(FlowCheckpointVersionNodeStartupCheckTest::ReceiverFlow, alice.nodeInfo.singleIdentity()).id
        // Wait until Bob's flow has started
        bob.rpc.stateMachinesFeed().let { it.updates.map { it.id }.startWith(it.snapshot.map { it.id }) }.toBlocking().first()
        return Pair(bob, flowId)
    }
    
	private fun DriverDSL.restartBobWithMismatchedCorDapp() {
            val cordappsDir = baseDirectory(BOB_NAME) / "cordapps"

            // Test the scenerio where the CorDapp no longer exists
            cordappsDir.deleteRecursively()
            cordappsDir.createDirectories()
            assertBobFailsToStartWithLogMessage(
                    CheckpointIncompatibleException.CordappNotInstalledException(ReceiverFlow::class.java.name).message
            )

            // Now test the scenerio where the CorDapp's hash is different but the flow exists within the jar
            val modifiedCordapp = defaultCordapp.copy(name = "${defaultCordapp.name}-modified")
            assertThat(defaultCordapp.jarFile.hash).isNotEqualTo(modifiedCordapp.jarFile.hash) // Just double-check the hashes are different
            modifiedCordapp.jarFile.copyToDirectory(cordappsDir)
            assertBobFailsToStartWithLogMessage(
                    // The part of the log message generated by CheckpointIncompatibleException.FlowVersionIncompatibleException
                    "that is incompatible with the current installed version of"
            )
    }

    private fun DriverDSL.assertBobFailsToStartWithLogMessage(logMessage: String) {
        assertUncompletedCheckpoints(BOB_NAME, 1)

        assertFailsWith(CordaException::class) {
            startNode(NodeParameters(
                    providedName = BOB_NAME,
                    customOverrides = mapOf("devMode" to false)
            )).getOrThrow()
        }

        assertThat(stdOutLogFile(BOB_NAME).readText()).contains(logMessage)
    }

    private fun DriverDSL.stdOutLogFile(name: CordaX500Name): Path {
        return baseDirectory(name)
                .listDirectoryEntries("*stdout.log")
                .sortedBy { it.attributes().creationTime() }
                .last()
    }

    @InitiatingFlow
    @StartableByRPC
    class ReceiverFlow(private val otherParty: Party) : FlowLogic<String>() {
        @Suspendable
        override fun call(): String = initiateFlow(otherParty).receive<String>().unwrap { it }
    }

    @InitiatedBy(ReceiverFlow::class)
    class SenderFlow(private val otherSide: FlowSession) : FlowLogic<Unit>() {
        @Suspendable
        override fun call() = otherSide.send("Hello!")
    }

    @StartableByRPC
    class UpdateStatusToPaused(private val id: StateMachineRunId): FlowLogic<Unit>() {
        @Suspendable
        override fun call() {
            val statement = "Update node_checkpoints set status = ${Checkpoint.FlowStatus.PAUSED.ordinal} where flow_id = '${id.uuid}'"
            serviceHub.jdbcSession().prepareStatement(statement).execute()
        }
    }
}
